package com.atguigu.flink.tableapi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;

/**
 * Created by Smexy on 2023/2/5
 */
public class Demo5_ReadFile
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        //构造一个代表文件系统的对象
        FileSystem fileSystem = new FileSystem().path("data/sensor.txt");

        //提供元数据信息
        Schema schema = new Schema()
            .field("id", DataTypes.STRING())
            .field("ts", DataTypes.BIGINT())
            .field("vc", DataTypes.INT());

        /*
                连接外部设备，读取外部设备的数据，映射为表
         */
        tableEnvironment.connect(fileSystem)
                        //文件的数据格式
                        .withFormat(new Csv())
                        //表的元数据信息
                        .withSchema(schema)
                        .inAppendMode()
                        .createTemporaryTable("t1");


        tableEnvironment.sqlQuery("select * from t1 ")
                        .execute()
                        .print();

    }
}
